{
    "cells": [
        {
            "cell_type": "markdown",
            "metadata": {},
            "source": [
                "<i>Copyright (c) Recommenders contributors.</i>\n",
                "\n",
                "<i>Licensed under the MIT License.</i>"
            ]
        },
        {
            "cell_type": "markdown",
            "metadata": {},
            "source": [
                "# Apply Diversity Metrics  \n",
                "## -- Compare ALS and Random Recommenders on MovieLens (PySpark)\n",
                "\n",
                "In this notebook, we demonstrate how to evaluate a recommender using metrics other than commonly used rating/ranking metrics.\n",
                "\n",
                "Such metrics include:\n",
                "- Coverage - We use following two metrics defined by \\[Shani and Gunawardana\\]:\n",
                " \n",
                "    - (1) catalog_coverage, which measures the proportion of items that get recommended from the item catalog; \n",
                "    - (2) distributional_coverage, which measures how equally different items are recommended in the recommendations to all users.\n",
                "\n",
                "- Novelty - A more novel item indicates it is less popular, i.e. it gets recommended less frequently.\n",
                "We use the definition of novelty from \\[Castells et al.\\]\n",
                "\n",
                "- Diversity - The dissimilarity of items being recommended.\n",
                "We use a definition based on _intralist similarity_ by \\[Zhang et al.]\n",
                "\n",
                "- Serendipity - The \"unusualness\" or \"surprise\" of recommendations to a user.\n",
                "We use a definition based on cosine similarity by \\[Zhang et al.]\n",
                "\n",
                "We evaluate the results obtained with two approaches: using the ALS recommender algorithm vs. a baseline of random recommendations. \n",
                " - Matrix factorization by [ALS](https://spark.apache.org/docs/latest/api/python/_modules/pyspark/ml/recommendation.html#ALS) (Alternating Least Squares) is a well known collaborative filtering algorithm.\n",
                " - We also define a process which randomly recommends unseen items to each user. \n",
                " - We show two options to calculate item-item similarity: (1) based on item co-occurrence count; and (2) based on item feature vectors.\n",
                " \n",
                "The comparision results show that the ALS recommender outperforms the random recommender on ranking metrics (Precision@k, Recall@k, NDCG@k, and\tMean average precision), while the random recommender outperforms ALS recommender on diversity metrics. This is because ALS is optimized for estimating the item rating as accurate as possible, therefore it performs well on accuracy metrics including rating and ranking metrics. As a side effect, the items being recommended tend to be popular items, which are the items mostly sold or viewed. It leaves the [long-tail items](https://github.com/microsoft/recommenders/blob/main/GLOSSARY.md) having less chance to get introduced to the users. This is the reason why ALS is not performing as well as a random recommender on diversity metrics. \n",
                "\n",
                "From the algorithmic point of view, items in the tail suffer from the cold-start problem, making them hard for recommendation systems to use. However, from the business point of view, oftentimes the items in the tail can be highly profitable, since, depending on supply, business can apply a higher margin to them. Recommendation systems that optimize metrics like novelty and diversity, can help to find users willing to get these long tail items. Usually there is a trade-off between one type of metric vs. another. One should decide which set of metrics to optimize based on business scenarios."
            ]
        },
        {
            "cell_type": "markdown",
            "metadata": {},
            "source": [
                "**Coverage**\n",
                "\n",
                "We define _catalog coverage_ as the proportion of items showing in all users’ recommendations: \n",
                "$$\n",
                "\\textrm{CatalogCoverage} = \\frac{|N_r|}{|N_t|}\n",
                "$$\n",
                "where $N_r$ denotes the set of items in the recommendations (`reco_df` in the code below) and $N_t$ the set of items in the historical data (`train_df`).\n",
                "\n",
                "_Distributional coverage_ measures how equally different items are recommended to users when a particular recommender system is used.\n",
                "If  $p(i|R)$ denotes the probability that item $i$ is observed among all recommendation lists, we define distributional coverage as\n",
                "$$\n",
                "\\textrm{DistributionalCoverage} = -\\sum_{i \\in N_t} p(i|R) \\log_2 p(i)\n",
                "$$\n",
                "where \n",
                "$$\n",
                "p(i|R) = \\frac{|M_r (i)|}{|\\textrm{reco_df}|}\n",
                "$$\n",
                "and $M_r (i)$ denotes the users who are recommended item $i$.\n"
            ]
        },
        {
            "cell_type": "markdown",
            "metadata": {},
            "source": [
                "\n",
                "**Diversity**\n",
                "\n",
                "Diversity represents the variety present in a list of recommendations.\n",
                "_Intra-List Similarity_ aggregates the pairwise similarity of all items in a set. A recommendation list with groups of very similar items will score a high intra-list similarity. Lower intra-list similarity indicates higher diversity.\n",
                "To measure similarity between any two items we use _cosine similarity_:\n",
                "$$\n",
                "\\textrm{Cosine Similarity}(i,j)=  \\frac{|M_t^{l(i,j)}|} {\\sqrt{|M_t^{l(i)}|} \\sqrt{|M_t^{l(j)}|} }\n",
                "$$\n",
                "where $M_t^{l(i)}$ denotes the set of users who liked item $i$ and $M_t^{l(i,j)}$ the users who liked both $i$ and $j$.\n",
                "Intra-list similarity is then defined as \n",
                "$$\n",
                "\\textrm{IL} = \\frac{1}{|M|} \\sum_{u \\in M} \\frac{1}{\\binom{N_r(u)}{2}} \\sum_{i,j \\in N_r (u),\\, i<j} \\textrm{Cosine Similarity}(i,j)\n",
                "$$\n",
                "where $M$ is the set of users and $N_r(u)$ the set of recommendations for user $u$. Finally, diversity is defined as\n",
                "$$\n",
                "\\textrm{diversity} = 1 - \\textrm{IL}\n",
                "$$\n"
            ]
        },
        {
            "cell_type": "markdown",
            "metadata": {},
            "source": [
                "\n",
                "**Novelty**\n",
                "\n",
                "The novelty of an item is inverse to its _popularity_. If $p(i)$ represents the probability that item $i$ is observed (or known, interacted with etc.) by users, then  \n",
                "$$\n",
                "p(i) = \\frac{|M_t (i)|} {|\\textrm{train_df}|}\n",
                "$$\n",
                "where $M_t (i)$ is the set of users who have interacted with item $i$ in the historical data. \n",
                "\n",
                "The novelty of an item is then defined as\n",
                "$$\n",
                "\\textrm{novelty}(i) = -\\log_2 p(i)\n",
                "$$\n",
                "and the novelty of the recommendations across all users is defined as\n",
                "$$\n",
                "\\textrm{novelty} = \\sum_{i \\in N_r} \\frac{|M_r (i)|}{|\\textrm{reco_df}|} \\textrm{novelty}(i)\n",
                "$$\n"
            ]
        },
        {
            "cell_type": "markdown",
            "metadata": {},
            "source": [
                "**Serendipity**\n",
                "\n",
                "Serendipity represents the “unusualness” or “surprise” of recommendations. Unlike novelty, serendipity encompasses the semantic content of items and can be imagined as the distance between recommended items and their expected contents (Zhang et al.) Lower cosine similarity indicates lower expectedness and higher serendipity.\n",
                "We define the expectedness of an unseen item $i$ for user $u$ as the average similarity between every already seen item $j$ in the historical data and $i$:\n",
                "$$\n",
                "\\textrm{expectedness}(i|u) = \\frac{1}{|N_t (u)|} \\sum_{j \\in N_t (u)} \\textrm{Cosine Similarity}(i,j)\n",
                "$$\n",
                "The serendipity of item $i$ is (1 - expectedness) multiplied by _relevance_, where relevance indicates whether the item turns out to be liked by the user or not. For example, in a binary scenario, if an item in `reco_df` is liked (purchased, clicked) in `test_df`, its relevance equals one, otherwise it equals zero. Aggregating over all users and items, the overall \n",
                "serendipity is defined as\n",
                "$$\n",
                "\\textrm{serendipity} = \\frac{1}{|M|} \\sum_{u \\in M_r}\n",
                "\\frac{1}{|N_r (u)|} \\sum_{i \\in N_r (u)} \\big(1 - \\textrm{expectedness}(i|u) \\big) \\, \\textrm{relevance}(i)\n",
                "$$\n"
            ]
        },
        {
            "cell_type": "markdown",
            "metadata": {},
            "source": [
                "**Note**: This notebook requires a PySpark environment to run properly. Please follow the steps in [SETUP.md](https://github.com/Microsoft/Recommenders/blob/master/SETUP.md#dependencies-setup) to install the PySpark environment."
            ]
        },
        {
            "cell_type": "code",
            "execution_count": 1,
            "metadata": {},
            "outputs": [
                {
                    "name": "stdout",
                    "output_type": "stream",
                    "text": [
                        "System version: 3.8.0 (default, Nov  6 2019, 21:49:08) \n",
                        "[GCC 7.3.0]\n",
                        "Spark version: 3.2.0\n"
                    ]
                }
            ],
            "source": [
                "import warnings\n",
                "warnings.simplefilter(action='ignore', category=FutureWarning)\n",
                "\n",
                "import sys\n",
                "import numpy as np\n",
                "import pandas as pd\n",
                "\n",
                "import pyspark\n",
                "import pyspark.sql.functions as F\n",
                "from pyspark.sql.window import Window\n",
                "from pyspark.sql.types import FloatType, IntegerType, LongType, StructType, StructField\n",
                "from pyspark.ml.feature import Tokenizer, StopWordsRemover\n",
                "from pyspark.ml.feature import HashingTF, CountVectorizer, VectorAssembler\n",
                "from pyspark.ml.recommendation import ALS\n",
                "\n",
                "from recommenders.utils.timer import Timer\n",
                "from recommenders.datasets import movielens\n",
                "from recommenders.datasets.spark_splitters import spark_random_split\n",
                "from recommenders.evaluation.spark_evaluation import SparkRankingEvaluation, SparkDiversityEvaluation\n",
                "from recommenders.utils.spark_utils import start_or_get_spark\n",
                "\n",
                "%load_ext autoreload\n",
                "%autoreload 2\n",
                "\n",
                "print(\"System version: {}\".format(sys.version))\n",
                "print(\"Spark version: {}\".format(pyspark.__version__))\n"
            ]
        },
        {
            "cell_type": "markdown",
            "metadata": {},
            "source": [
                "\n",
                "Set the default parameters."
            ]
        },
        {
            "cell_type": "code",
            "execution_count": 2,
            "metadata": {
                "tags": [
                    "parameters"
                ]
            },
            "outputs": [],
            "source": [
                "# top k items to recommend\n",
                "TOP_K = 10\n",
                "\n",
                "# Select MovieLens data size: 100k, 1m, 10m, or 20m\n",
                "MOVIELENS_DATA_SIZE = '100k'\n",
                "\n",
                "# user, item column names\n",
                "COL_USER=\"UserId\"\n",
                "COL_ITEM=\"MovieId\"\n",
                "COL_RATING=\"Rating\"\n",
                "COL_TITLE=\"Title\"\n",
                "COL_GENRE=\"Genre\""
            ]
        },
        {
            "cell_type": "markdown",
            "metadata": {},
            "source": [
                "### 1. Set up Spark context\n",
                "\n",
                "The following settings work well for debugging locally on VM - change when running on a cluster. We set up a giant single executor with many threads and specify memory cap. "
            ]
        },
        {
            "cell_type": "code",
            "execution_count": 3,
            "metadata": {},
            "outputs": [],
            "source": [
                "# the following settings work well for debugging locally on VM - change when running on a cluster\n",
                "# set up a giant single executor with many threads and specify memory cap\n",
                "\n",
                "spark = start_or_get_spark(\"ALS PySpark\", memory=\"16g\")\n",
                "spark.conf.set(\"spark.sql.analyzer.failAmbiguousSelfJoin\", \"false\")\n",
                "spark.conf.set(\"spark.sql.crossJoin.enabled\", \"true\")"
            ]
        },
        {
            "cell_type": "markdown",
            "metadata": {},
            "source": [
                "### 2. Download the MovieLens dataset"
            ]
        },
        {
            "cell_type": "code",
            "execution_count": 4,
            "metadata": {},
            "outputs": [
                {
                    "name": "stderr",
                    "output_type": "stream",
                    "text": [
                        "100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 4.81k/4.81k [00:05<00:00, 862KB/s]\n",
                        "                                                                                \r"
                    ]
                },
                {
                    "name": "stdout",
                    "output_type": "stream",
                    "text": [
                        "+-------+------+------+---------+--------------------+------+\n",
                        "|MovieId|UserId|Rating|Timestamp|               Title| Genre|\n",
                        "+-------+------+------+---------+--------------------+------+\n",
                        "|     26|   138|   5.0|879024232|Brothers McMullen...|Comedy|\n",
                        "|     26|   224|   3.0|888104153|Brothers McMullen...|Comedy|\n",
                        "|     26|    18|   4.0|880129731|Brothers McMullen...|Comedy|\n",
                        "|     26|   222|   3.0|878183043|Brothers McMullen...|Comedy|\n",
                        "|     26|    43|   5.0|883954901|Brothers McMullen...|Comedy|\n",
                        "|     26|   201|   4.0|884111927|Brothers McMullen...|Comedy|\n",
                        "|     26|   299|   4.0|878192601|Brothers McMullen...|Comedy|\n",
                        "|     26|    95|   3.0|880571951|Brothers McMullen...|Comedy|\n",
                        "|     26|    89|   3.0|879459909|Brothers McMullen...|Comedy|\n",
                        "|     26|   361|   3.0|879440941|Brothers McMullen...|Comedy|\n",
                        "|     26|   194|   3.0|879522240|Brothers McMullen...|Comedy|\n",
                        "|     26|   391|   5.0|877399745|Brothers McMullen...|Comedy|\n",
                        "|     26|   345|   3.0|884993555|Brothers McMullen...|Comedy|\n",
                        "|     26|   303|   4.0|879468307|Brothers McMullen...|Comedy|\n",
                        "|     26|   401|   3.0|891033395|Brothers McMullen...|Comedy|\n",
                        "|     26|   429|   3.0|882386333|Brothers McMullen...|Comedy|\n",
                        "|     26|   293|   3.0|888907015|Brothers McMullen...|Comedy|\n",
                        "|     26|   270|   5.0|876954995|Brothers McMullen...|Comedy|\n",
                        "|     26|   442|   3.0|883388576|Brothers McMullen...|Comedy|\n",
                        "|     26|   342|   2.0|875320037|Brothers McMullen...|Comedy|\n",
                        "+-------+------+------+---------+--------------------+------+\n",
                        "only showing top 20 rows\n",
                        "\n"
                    ]
                }
            ],
            "source": [
                "# Note: The DataFrame-based API for ALS currently only supports integers for user and item ids.\n",
                "schema = StructType(\n",
                "    (\n",
                "        StructField(COL_USER, IntegerType()),\n",
                "        StructField(COL_ITEM, IntegerType()),\n",
                "        StructField(COL_RATING, FloatType()),\n",
                "        StructField(\"Timestamp\", LongType()),\n",
                "    )\n",
                ")\n",
                "\n",
                "data = movielens.load_spark_df(spark, size=MOVIELENS_DATA_SIZE, schema=schema, title_col=COL_TITLE, genres_col=COL_GENRE)\n",
                "data.show()"
            ]
        },
        {
            "cell_type": "markdown",
            "metadata": {},
            "source": [
                "#### Split the data using the Spark random splitter provided in utilities"
            ]
        },
        {
            "cell_type": "code",
            "execution_count": 5,
            "metadata": {},
            "outputs": [
                {
                    "name": "stderr",
                    "output_type": "stream",
                    "text": [
                        "                                                                                \r"
                    ]
                },
                {
                    "name": "stdout",
                    "output_type": "stream",
                    "text": [
                        "N train_df 75147\n"
                    ]
                },
                {
                    "name": "stderr",
                    "output_type": "stream",
                    "text": [
                        "[Stage 19:================================================>     (178 + 3) / 200]\r"
                    ]
                },
                {
                    "name": "stdout",
                    "output_type": "stream",
                    "text": [
                        "N test_df 24853\n"
                    ]
                },
                {
                    "name": "stderr",
                    "output_type": "stream",
                    "text": [
                        "\r\n",
                        "                                                                                \r"
                    ]
                }
            ],
            "source": [
                "train_df, test_df = spark_random_split(data.select(COL_USER, COL_ITEM, COL_RATING), ratio=0.75, seed=123)\n",
                "print (\"N train_df\", train_df.cache().count())\n",
                "print (\"N test_df\", test_df.cache().count())"
            ]
        },
        {
            "cell_type": "markdown",
            "metadata": {},
            "source": [
                "#### Get all possible user-item pairs"
            ]
        },
        {
            "cell_type": "markdown",
            "metadata": {},
            "source": [
                "Note: We assume that training data contains all users and all catalog items. "
            ]
        },
        {
            "cell_type": "code",
            "execution_count": 6,
            "metadata": {},
            "outputs": [],
            "source": [
                "users = train_df.select(COL_USER).distinct()\n",
                "items = train_df.select(COL_ITEM).distinct()\n",
                "user_item = users.crossJoin(items)"
            ]
        },
        {
            "cell_type": "markdown",
            "metadata": {},
            "source": [
                "### 3. Train the ALS model on the training data, and get the top-k recommendations for our testing data\n",
                "\n",
                "To predict movie ratings, we use the rating data in the training set as users' explicit feedback. The hyperparameters used in building the model are referenced from [here](http://mymedialite.net/examples/datasets.html). We do not constrain the latent factors (`nonnegative = False`) in order to allow for both positive and negative preferences towards movies.\n",
                "Timing will vary depending on the machine being used to train."
            ]
        },
        {
            "cell_type": "code",
            "execution_count": 7,
            "metadata": {},
            "outputs": [],
            "source": [
                "header = {\n",
                "    \"userCol\": COL_USER,\n",
                "    \"itemCol\": COL_ITEM,\n",
                "    \"ratingCol\": COL_RATING,\n",
                "}\n",
                "\n",
                "\n",
                "als = ALS(\n",
                "    rank=10,\n",
                "    maxIter=15,\n",
                "    implicitPrefs=False,\n",
                "    regParam=0.05,\n",
                "    coldStartStrategy='drop',\n",
                "    nonnegative=False,\n",
                "    seed=42,\n",
                "    **header\n",
                ")"
            ]
        },
        {
            "cell_type": "code",
            "execution_count": 8,
            "metadata": {},
            "outputs": [
                {
                    "name": "stderr",
                    "output_type": "stream",
                    "text": [
                        "                                                                                \r"
                    ]
                },
                {
                    "name": "stdout",
                    "output_type": "stream",
                    "text": [
                        "Took 10.75137371000028 seconds for training.\n"
                    ]
                }
            ],
            "source": [
                "with Timer() as train_time:\n",
                "    model = als.fit(train_df)\n",
                "\n",
                "print(\"Took {} seconds for training.\".format(train_time.interval))"
            ]
        },
        {
            "cell_type": "markdown",
            "metadata": {},
            "source": [
                "In the movie recommendation use case, recommending movies that have been rated by the users does not make sense. Therefore, the rated movies are removed from the recommended items.\n",
                "\n",
                "In order to achieve this, we recommend all movies to all users, and then remove the user-movie pairs that exist in the training dataset."
            ]
        },
        {
            "cell_type": "code",
            "execution_count": 9,
            "metadata": {},
            "outputs": [
                {
                    "name": "stderr",
                    "output_type": "stream",
                    "text": [
                        "                                                                                \r"
                    ]
                },
                {
                    "name": "stdout",
                    "output_type": "stream",
                    "text": [
                        "1464772\n"
                    ]
                },
                {
                    "name": "stderr",
                    "output_type": "stream",
                    "text": [
                        "[Stage 235:>                                                        (0 + 2) / 2]\r"
                    ]
                },
                {
                    "name": "stdout",
                    "output_type": "stream",
                    "text": [
                        "9430\n"
                    ]
                },
                {
                    "name": "stderr",
                    "output_type": "stream",
                    "text": [
                        "\r\n",
                        "                                                                                \r"
                    ]
                }
            ],
            "source": [
                "# Score all user-item pairs\n",
                "dfs_pred = model.transform(user_item)\n",
                "\n",
                "# Remove seen items.\n",
                "dfs_pred_exclude_train = dfs_pred.alias(\"pred\").join(\n",
                "    train_df.alias(\"train\"),\n",
                "    (dfs_pred[COL_USER] == train_df[COL_USER]) & (dfs_pred[COL_ITEM] == train_df[COL_ITEM]),\n",
                "    how='outer'\n",
                ")\n",
                "\n",
                "top_all = dfs_pred_exclude_train.filter(dfs_pred_exclude_train[\"train.Rating\"].isNull()) \\\n",
                "    .select('pred.' + COL_USER, 'pred.' + COL_ITEM, 'pred.' + \"prediction\")\n",
                "\n",
                "print(top_all.count())\n",
                "    \n",
                "window = Window.partitionBy(COL_USER).orderBy(F.col(\"prediction\").desc())    \n",
                "top_k_reco = top_all.select(\"*\", F.row_number().over(window).alias(\"rank\")).filter(F.col(\"rank\") <= TOP_K).drop(\"rank\")\n",
                " \n",
                "print(top_k_reco.count())"
            ]
        },
        {
            "cell_type": "markdown",
            "metadata": {},
            "source": [
                "### 4. Random Recommender\n",
                "\n",
                "We define a recommender which randomly recommends unseen items to each user. "
            ]
        },
        {
            "cell_type": "code",
            "execution_count": 10,
            "metadata": {},
            "outputs": [],
            "source": [
                "# random recommender\n",
                "window = Window.partitionBy(COL_USER).orderBy(F.rand())\n",
                "\n",
                "# randomly generated recommendations for each user\n",
                "pred_df = (\n",
                "  train_df\n",
                "  # join training data with all possible user-item pairs (seen in training)\n",
                "  .join(user_item,\n",
                "        on=[COL_USER, COL_ITEM],\n",
                "        how=\"right\"\n",
                "  )\n",
                "  # get user-item pairs that were not seen in the training data\n",
                "  .filter(F.col(COL_RATING).isNull())\n",
                "  # count items for each user (randomly sorting them)\n",
                "  .withColumn(\"score\", F.row_number().over(window))\n",
                "  # get the top k items per user\n",
                "  .filter(F.col(\"score\") <= TOP_K)\n",
                "  .drop(COL_RATING)\n",
                ")"
            ]
        },
        {
            "cell_type": "markdown",
            "metadata": {},
            "source": [
                "### 5. ALS vs Random Recommenders Performance Comparison"
            ]
        },
        {
            "cell_type": "code",
            "execution_count": 11,
            "metadata": {},
            "outputs": [],
            "source": [
                "def get_ranking_results(ranking_eval):\n",
                "    metrics = {\n",
                "        \"Precision@k\": ranking_eval.precision_at_k(),\n",
                "        \"Recall@k\": ranking_eval.recall_at_k(),\n",
                "        \"NDCG@k\": ranking_eval.ndcg_at_k(),\n",
                "        \"Mean average precision\": ranking_eval.map_at_k()\n",
                "      \n",
                "    }\n",
                "    return metrics   \n",
                "\n",
                "def get_diversity_results(diversity_eval):\n",
                "    metrics = {\n",
                "        \"catalog_coverage\":diversity_eval.catalog_coverage(),\n",
                "        \"distributional_coverage\":diversity_eval.distributional_coverage(), \n",
                "        \"novelty\": diversity_eval.novelty(), \n",
                "        \"diversity\": diversity_eval.diversity(), \n",
                "        \"serendipity\": diversity_eval.serendipity()\n",
                "    }\n",
                "    return metrics "
            ]
        },
        {
            "cell_type": "code",
            "execution_count": 12,
            "metadata": {},
            "outputs": [],
            "source": [
                "def generate_summary(data, algo, k, ranking_metrics, diversity_metrics):\n",
                "    summary = {\"Data\": data, \"Algo\": algo, \"K\": k}\n",
                "\n",
                "    if ranking_metrics is None:\n",
                "        ranking_metrics = {           \n",
                "            \"Precision@k\": np.nan,\n",
                "            \"Recall@k\": np.nan,            \n",
                "            \"nDCG@k\": np.nan,\n",
                "            \"MAP\": np.nan,\n",
                "        }\n",
                "    summary.update(ranking_metrics)\n",
                "    summary.update(diversity_metrics)\n",
                "    return summary"
            ]
        },
        {
            "cell_type": "markdown",
            "metadata": {},
            "source": [
                "#### ALS Recommender Performance Results"
            ]
        },
        {
            "cell_type": "code",
            "execution_count": 13,
            "metadata": {},
            "outputs": [
                {
                    "name": "stderr",
                    "output_type": "stream",
                    "text": [
                        "                                                                                \r"
                    ]
                }
            ],
            "source": [
                "als_ranking_eval = SparkRankingEvaluation(\n",
                "    test_df, \n",
                "    top_all, \n",
                "    k = TOP_K, \n",
                "    col_user=COL_USER, \n",
                "    col_item=COL_ITEM,\n",
                "    col_rating=COL_RATING, \n",
                "    col_prediction=\"prediction\",\n",
                "    relevancy_method=\"top_k\"\n",
                ")\n",
                "\n",
                "als_ranking_metrics = get_ranking_results(als_ranking_eval)"
            ]
        },
        {
            "cell_type": "code",
            "execution_count": 14,
            "metadata": {},
            "outputs": [
                {
                    "name": "stderr",
                    "output_type": "stream",
                    "text": [
                        "                                                                                \r"
                    ]
                }
            ],
            "source": [
                "als_diversity_eval = SparkDiversityEvaluation(\n",
                "    train_df = train_df, \n",
                "    reco_df = top_k_reco,\n",
                "    col_user = COL_USER, \n",
                "    col_item = COL_ITEM\n",
                ")\n",
                "\n",
                "als_diversity_metrics = get_diversity_results(als_diversity_eval)"
            ]
        },
        {
            "cell_type": "code",
            "execution_count": 15,
            "metadata": {},
            "outputs": [],
            "source": [
                "als_results = generate_summary(MOVIELENS_DATA_SIZE, \"als\", TOP_K, als_ranking_metrics, als_diversity_metrics)"
            ]
        },
        {
            "cell_type": "markdown",
            "metadata": {},
            "source": [
                "#### Random Recommender Performance Results"
            ]
        },
        {
            "cell_type": "code",
            "execution_count": 16,
            "metadata": {},
            "outputs": [
                {
                    "name": "stderr",
                    "output_type": "stream",
                    "text": [
                        "                                                                                \r"
                    ]
                }
            ],
            "source": [
                "random_ranking_eval = SparkRankingEvaluation(\n",
                "    test_df,\n",
                "    pred_df,\n",
                "    col_user=COL_USER,\n",
                "    col_item=COL_ITEM,\n",
                "    col_rating=COL_RATING,\n",
                "    col_prediction=\"score\",\n",
                "    k=TOP_K,\n",
                ")\n",
                "\n",
                "random_ranking_metrics = get_ranking_results(random_ranking_eval)"
            ]
        },
        {
            "cell_type": "code",
            "execution_count": 17,
            "metadata": {},
            "outputs": [
                {
                    "name": "stderr",
                    "output_type": "stream",
                    "text": [
                        "                                                                                \r"
                    ]
                }
            ],
            "source": [
                "random_diversity_eval = SparkDiversityEvaluation(\n",
                "    train_df = train_df, \n",
                "    reco_df = pred_df, \n",
                "    col_user = COL_USER, \n",
                "    col_item = COL_ITEM\n",
                ")\n",
                "  \n",
                "random_diversity_metrics = get_diversity_results(random_diversity_eval)"
            ]
        },
        {
            "cell_type": "code",
            "execution_count": 18,
            "metadata": {},
            "outputs": [],
            "source": [
                "random_results = generate_summary(MOVIELENS_DATA_SIZE, \"random\", TOP_K, random_ranking_metrics, random_diversity_metrics)"
            ]
        },
        {
            "cell_type": "markdown",
            "metadata": {},
            "source": [
                "#### Result Comparison"
            ]
        },
        {
            "cell_type": "code",
            "execution_count": 19,
            "metadata": {},
            "outputs": [],
            "source": [
                "cols = [\"Data\", \"Algo\", \"K\", \"Precision@k\", \"Recall@k\", \"NDCG@k\", \"Mean average precision\",\"catalog_coverage\", \"distributional_coverage\",\"novelty\", \"diversity\", \"serendipity\" ]\n",
                "df_results = pd.DataFrame(columns=cols)\n",
                "\n",
                "df_results.loc[1] = als_results \n",
                "df_results.loc[2] = random_results "
            ]
        },
        {
            "cell_type": "code",
            "execution_count": 20,
            "metadata": {},
            "outputs": [
                {
                    "data": {
                        "text/html": [
                            "<div>\n",
                            "<style scoped>\n",
                            "    .dataframe tbody tr th:only-of-type {\n",
                            "        vertical-align: middle;\n",
                            "    }\n",
                            "\n",
                            "    .dataframe tbody tr th {\n",
                            "        vertical-align: top;\n",
                            "    }\n",
                            "\n",
                            "    .dataframe thead th {\n",
                            "        text-align: right;\n",
                            "    }\n",
                            "</style>\n",
                            "<table border=\"1\" class=\"dataframe\">\n",
                            "  <thead>\n",
                            "    <tr style=\"text-align: right;\">\n",
                            "      <th></th>\n",
                            "      <th>Data</th>\n",
                            "      <th>Algo</th>\n",
                            "      <th>K</th>\n",
                            "      <th>Precision@k</th>\n",
                            "      <th>Recall@k</th>\n",
                            "      <th>NDCG@k</th>\n",
                            "      <th>Mean average precision</th>\n",
                            "      <th>catalog_coverage</th>\n",
                            "      <th>distributional_coverage</th>\n",
                            "      <th>novelty</th>\n",
                            "      <th>diversity</th>\n",
                            "      <th>serendipity</th>\n",
                            "    </tr>\n",
                            "  </thead>\n",
                            "  <tbody>\n",
                            "    <tr>\n",
                            "      <th>1</th>\n",
                            "      <td>100k</td>\n",
                            "      <td>als</td>\n",
                            "      <td>10</td>\n",
                            "      <td>0.044374</td>\n",
                            "      <td>0.015567</td>\n",
                            "      <td>0.040657</td>\n",
                            "      <td>0.004202</td>\n",
                            "      <td>0.374158</td>\n",
                            "      <td>7.989889</td>\n",
                            "      <td>11.740626</td>\n",
                            "      <td>0.890659</td>\n",
                            "      <td>0.879359</td>\n",
                            "    </tr>\n",
                            "    <tr>\n",
                            "      <th>2</th>\n",
                            "      <td>100k</td>\n",
                            "      <td>random</td>\n",
                            "      <td>10</td>\n",
                            "      <td>0.018259</td>\n",
                            "      <td>0.006516</td>\n",
                            "      <td>0.018537</td>\n",
                            "      <td>0.002038</td>\n",
                            "      <td>0.998775</td>\n",
                            "      <td>10.543160</td>\n",
                            "      <td>12.180267</td>\n",
                            "      <td>0.923302</td>\n",
                            "      <td>0.892897</td>\n",
                            "    </tr>\n",
                            "  </tbody>\n",
                            "</table>\n",
                            "</div>"
                        ],
                        "text/plain": [
                            "   Data    Algo   K  Precision@k  Recall@k    NDCG@k  Mean average precision  \\\n",
                            "1  100k     als  10     0.044374  0.015567  0.040657                0.004202   \n",
                            "2  100k  random  10     0.018259  0.006516  0.018537                0.002038   \n",
                            "\n",
                            "   catalog_coverage  distributional_coverage    novelty  diversity  \\\n",
                            "1          0.374158                 7.989889  11.740626   0.890659   \n",
                            "2          0.998775                10.543160  12.180267   0.923302   \n",
                            "\n",
                            "   serendipity  \n",
                            "1     0.879359  \n",
                            "2     0.892897  "
                        ]
                    },
                    "execution_count": 20,
                    "metadata": {},
                    "output_type": "execute_result"
                }
            ],
            "source": [
                "df_results"
            ]
        },
        {
            "cell_type": "markdown",
            "metadata": {},
            "source": [
                "#### Conclusion\n",
                "The comparision results show that the ALS recommender outperforms the random recommender on ranking metrics (Precision@k, Recall@k, NDCG@k, and\tMean average precision), while the random recommender outperforms ALS recommender on diversity metrics. This is because ALS is optimized for estimating the item rating as accurate as possible, therefore it performs well on accuracy metrics including rating and ranking metrics. As a side effect, the items being recommended tend to be popular items, which are the items mostly sold or viewed. It leaves the long-tail less popular items having less chance to get introduced to the users. This is the reason why ALS is not performing as well as a random recommender on diversity metrics. "
            ]
        },
        {
            "cell_type": "markdown",
            "metadata": {},
            "source": [
                "### 6.  Calculate diversity metrics using item feature vector based item-item similarity\n",
                "In the above section we calculate diversity metrics using item co-occurrence count based item-item similarity. In the scenarios when item features are available, we may want to calculate item-item similarity based on item feature vectors. In this section, we show how to calculate diversity metrics using item feature vector based item-item similarity."
            ]
        },
        {
            "cell_type": "code",
            "execution_count": 21,
            "metadata": {},
            "outputs": [],
            "source": [
                "# Get movie features \"title\" and \"genres\"\n",
                "movies = (\n",
                "    data.groupBy(COL_ITEM, COL_TITLE, COL_GENRE).count()\n",
                "    .na.drop()  # remove rows with null values\n",
                "    .withColumn(COL_GENRE, F.split(F.col(COL_GENRE), \"\\|\"))  # convert to array of genres\n",
                "    .withColumn(COL_TITLE, F.regexp_replace(F.col(COL_TITLE), \"[\\(),:^0-9]\", \"\"))  # remove year from title\n",
                "    .drop(\"count\")  # remove unused columns\n",
                ")"
            ]
        },
        {
            "cell_type": "code",
            "execution_count": 22,
            "metadata": {},
            "outputs": [],
            "source": [
                "# tokenize \"title\" column\n",
                "title_tokenizer = Tokenizer(inputCol=COL_TITLE, outputCol=\"title_words\")\n",
                "tokenized_data = title_tokenizer.transform(movies)\n",
                "\n",
                "# remove stop words\n",
                "remover = StopWordsRemover(inputCol=\"title_words\", outputCol=\"text\")\n",
                "clean_data = remover.transform(tokenized_data).drop(COL_TITLE, \"title_words\")"
            ]
        },
        {
            "cell_type": "code",
            "execution_count": 23,
            "metadata": {},
            "outputs": [
                {
                    "name": "stderr",
                    "output_type": "stream",
                    "text": [
                        "[Stage 1441:============================================>       (172 + 2) / 200]\r"
                    ]
                },
                {
                    "name": "stdout",
                    "output_type": "stream",
                    "text": [
                        "+-------+------------------------------------------------------------------------------------+\n",
                        "|MovieId|features                                                                            |\n",
                        "+-------+------------------------------------------------------------------------------------+\n",
                        "|29     |(1043,[158,269,1025,1026,1029,1031],[1.0,1.0,1.0,1.0,1.0,1.0])                      |\n",
                        "|26     |(1043,[54,139,1025],[1.0,1.0,1.0])                                                  |\n",
                        "|1677   |(1043,[260,902,1024],[1.0,1.0,1.0])                                                 |\n",
                        "|964    |(1043,[416,429,1024,1025],[1.0,1.0,1.0,1.0])                                        |\n",
                        "|474    |(1043,[112,302,329,517,540,787,933,1032,1034],[1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])|\n",
                        "|1258   |(1043,[114,799,1025,1028],[1.0,1.0,1.0,1.0])                                        |\n",
                        "|541    |(1043,[635,910,1026,1029],[1.0,1.0,1.0,1.0])                                        |\n",
                        "|1224   |(1043,[978,1024],[1.0,1.0])                                                         |\n",
                        "|558    |(1043,[231,524,1024,1027,1041],[1.0,1.0,1.0,1.0,1.0])                               |\n",
                        "|191    |(1043,[206,1024,1035],[1.0,1.0,1.0])                                                |\n",
                        "+-------+------------------------------------------------------------------------------------+\n",
                        "only showing top 10 rows\n",
                        "\n"
                    ]
                },
                {
                    "name": "stderr",
                    "output_type": "stream",
                    "text": [
                        "\r\n",
                        "                                                                                \r"
                    ]
                }
            ],
            "source": [
                "# convert text input into feature vectors\n",
                "\n",
                "# step 1: perform HashingTF on column \"text\"\n",
                "text_hasher = HashingTF(inputCol=\"text\", outputCol=\"text_features\", numFeatures=1024)\n",
                "hashed_data = text_hasher.transform(clean_data)\n",
                "\n",
                "# step 2: fit a CountVectorizerModel from column \"genres\".\n",
                "count_vectorizer = CountVectorizer(inputCol=COL_GENRE, outputCol=\"genres_features\")\n",
                "count_vectorizer_model = count_vectorizer.fit(hashed_data)\n",
                "vectorized_data = count_vectorizer_model.transform(hashed_data)\n",
                "\n",
                "# step 3: assemble features into a single vector\n",
                "assembler = VectorAssembler(\n",
                "    inputCols=[\"text_features\", \"genres_features\"],\n",
                "    outputCol=\"features\",\n",
                ")\n",
                "feature_data = assembler.transform(vectorized_data).select(COL_ITEM, \"features\")\n",
                "\n",
                "feature_data.show(10, False)"
            ]
        },
        {
            "cell_type": "markdown",
            "metadata": {},
            "source": [
                "The *features* column is represented with a SparseVector object. For example, in the feature vector (1043,[128,544,1025],[1.0,1.0,1.0]), 1043 is the vector length, indicating the vector consisting of 1043 item features. The values at index positions 128,544,1025 are 1.0, and the values at other positions are all 0. "
            ]
        },
        {
            "cell_type": "code",
            "execution_count": 24,
            "metadata": {},
            "outputs": [
                {
                    "name": "stderr",
                    "output_type": "stream",
                    "text": [
                        "                                                                                \r"
                    ]
                },
                {
                    "name": "stdout",
                    "output_type": "stream",
                    "text": [
                        "0.8742459916963194\n",
                        "0.8891175823541189\n"
                    ]
                }
            ],
            "source": [
                "als_eval = SparkDiversityEvaluation(\n",
                "    train_df = train_df, \n",
                "    reco_df = top_k_reco,\n",
                "    item_feature_df = feature_data, \n",
                "    item_sim_measure=\"item_feature_vector\",\n",
                "    col_user = COL_USER, \n",
                "    col_item = COL_ITEM\n",
                ")\n",
                "\n",
                "als_diversity=als_eval.diversity()\n",
                "als_serendipity=als_eval.serendipity()\n",
                "print(als_diversity)\n",
                "print(als_serendipity)"
            ]
        },
        {
            "cell_type": "code",
            "execution_count": 25,
            "metadata": {},
            "outputs": [
                {
                    "name": "stderr",
                    "output_type": "stream",
                    "text": [
                        "                                                                                \r"
                    ]
                },
                {
                    "name": "stdout",
                    "output_type": "stream",
                    "text": [
                        "0.896073781038039\n",
                        "0.8925253230847529\n"
                    ]
                }
            ],
            "source": [
                "random_eval = SparkDiversityEvaluation(\n",
                "    train_df = train_df, \n",
                "    reco_df = pred_df, \n",
                "    item_feature_df = feature_data, \n",
                "    item_sim_measure=\"item_feature_vector\",    \n",
                "    col_user = COL_USER, \n",
                "    col_item = COL_ITEM\n",
                ")\n",
                "  \n",
                "random_diversity=random_eval.diversity()\n",
                "random_serendipity=random_eval.serendipity()\n",
                "print(random_diversity)\n",
                "print(random_serendipity)"
            ]
        },
        {
            "cell_type": "markdown",
            "metadata": {},
            "source": [
                "It's interesting that the value of diversity and serendipity changes when using different item-item similarity calculation approach, for both ALS algorithm and random recommender. The diversity and serendipity of random recommender are still higher than ALS algorithm. "
            ]
        },
        {
            "cell_type": "markdown",
            "metadata": {},
            "source": [
                "### References\n",
                "The metric definitions / formulations are based on the following references:\n",
                "- P. Castells, S. Vargas, and J. Wang, Novelty and diversity metrics for recommender systems: choice, discovery and relevance, ECIR 2011\n",
                "- G. Shani and A. Gunawardana, Evaluating recommendation systems, Recommender Systems Handbook pp. 257-297, 2010.\n",
                "- E. Yan, Serendipity: Accuracy’s unpopular best friend in recommender Systems, eugeneyan.com, April 2020\n",
                "- Y.C. Zhang, D.Ó. Séaghdha, D. Quercia and T. Jambor, Auralist: introducing serendipity into music recommendation, WSDM 2012\n"
            ]
        },
        {
            "cell_type": "code",
            "execution_count": 26,
            "metadata": {},
            "outputs": [],
            "source": [
                "# cleanup spark instance\n",
                "spark.stop()"
            ]
        }
    ],
    "metadata": {
        "interpreter": {
            "hash": "7ec2189bea0434770dca7423a25e631e1cca9c4e2b4ff137a82f4dff32ac9607"
        },
        "kernelspec": {
            "display_name": "Python 3 (ipykernel)",
            "language": "python",
            "name": "python3"
        },
        "language_info": {
            "codemirror_mode": {
                "name": "ipython",
                "version": 3
            },
            "file_extension": ".py",
            "mimetype": "text/x-python",
            "name": "python",
            "nbconvert_exporter": "python",
            "pygments_lexer": "ipython3",
            "version": "3.8.0"
        }
    },
    "nbformat": 4,
    "nbformat_minor": 1
}
